"""
ElevenLabs MCP Server

⚠️ IMPORTANT: This server provides access to ElevenLabs API endpoints which may incur costs.
Each tool that makes an API call is marked with a cost warning. Please follow these guidelines:

1. Only use tools when explicitly requested by the user
2. For tools that generate audio, consider the length of the text as it affects costs
3. Some operations like voice cloning or text-to-voice may have higher costs

Tools without cost warnings in their description are free to use as they only read existing data.
"""

import httpx
import os
import base64
from datetime import datetime
from io import BytesIO
from typing import Literal, Union
from dotenv import load_dotenv
from mcp.server.fastmcp import FastMCP
from mcp.types import (
    TextContent,
    Resource,
    EmbeddedResource,
)
from elevenlabs.client import ElevenLabs
from elevenlabs.types import MusicPrompt
from elevenlabs_mcp.model import McpVoice, McpModel, McpLanguage
from elevenlabs_mcp.utils import (
    make_error,
    make_output_path,
    make_output_file,
    handle_input_file,
    parse_conversation_transcript,
    handle_large_text,
    parse_location,
    get_mime_type,
    handle_output_mode,
    handle_multiple_files_output_mode,
    get_output_mode_description,
)

from elevenlabs_mcp.convai import create_conversation_config, create_platform_settings
from elevenlabs.types.knowledge_base_locator import KnowledgeBaseLocator

from elevenlabs.play import play
from elevenlabs_mcp import __version__
from pathlib import Path

load_dotenv()
api_key = os.getenv("ELEVENLABS_API_KEY")
base_path = os.getenv("ELEVENLABS_MCP_BASE_PATH")
output_mode = os.getenv("ELEVENLABS_MCP_OUTPUT_MODE", "files").strip().lower()
DEFAULT_VOICE_ID = os.getenv("ELEVENLABS_DEFAULT_VOICE_ID", "cgSgspJ2msm6clMCkdW9")

if output_mode not in {"files", "resources", "both"}:
    raise ValueError("ELEVENLABS_MCP_OUTPUT_MODE must be one of: 'files', 'resources', 'both'")
if not api_key:
    raise ValueError("ELEVENLABS_API_KEY environment variable is required")

origin = parse_location(os.getenv("ELEVENLABS_API_RESIDENCY"))

# Add custom client to ElevenLabs to set User-Agent header
custom_client = httpx.Client(
    headers={
        "User-Agent": f"ElevenLabs-MCP/{__version__}",
    },
)

client = ElevenLabs(api_key=api_key, httpx_client=custom_client, base_url=origin)
mcp = FastMCP("ElevenLabs")


def format_diarized_transcript(transcription) -> str:
    """Format transcript with speaker labels from diarized response."""
    try:
        # Try to access words array - the exact attribute might vary
        words = None
        if hasattr(transcription, "words"):
            words = transcription.words
        elif hasattr(transcription, "__dict__"):
            # Try to find words in the response dict
            for key, value in transcription.__dict__.items():
                if key == "words" or (
                    isinstance(value, list)
                    and len(value) > 0
                    and (
                        hasattr(value[0], "speaker_id")
                        if hasattr(value[0], "__dict__")
                        else (
                            "speaker_id" in value[0]
                            if isinstance(value[0], dict)
                            else False
                        )
                    )
                ):
                    words = value
                    break

        if not words:
            return transcription.text

        formatted_lines = []
        current_speaker = None
        current_text = []

        for word in words:
            # Get speaker_id - might be an attribute or dict key
            word_speaker = None
            if hasattr(word, "speaker_id"):
                word_speaker = word.speaker_id
            elif isinstance(word, dict) and "speaker_id" in word:
                word_speaker = word["speaker_id"]

            # Get text - might be an attribute or dict key
            word_text = None
            if hasattr(word, "text"):
                word_text = word.text
            elif isinstance(word, dict) and "text" in word:
                word_text = word["text"]

            if not word_speaker or not word_text:
                continue

            # Skip spacing/punctuation types if they exist
            if hasattr(word, "type") and word.type == "spacing":
                continue
            elif isinstance(word, dict) and word.get("type") == "spacing":
                continue

            if current_speaker != word_speaker:
                # Save previous speaker's text
                if current_speaker and current_text:
                    speaker_label = current_speaker.upper().replace("_", " ")
                    formatted_lines.append(f"{speaker_label}: {' '.join(current_text)}")

                # Start new speaker
                current_speaker = word_speaker
                current_text = [word_text.strip()]
            else:
                current_text.append(word_text.strip())

        # Add final speaker's text
        if current_speaker and current_text:
            speaker_label = current_speaker.upper().replace("_", " ")
            formatted_lines.append(f"{speaker_label}: {' '.join(current_text)}")

        return "\n\n".join(formatted_lines)

    except Exception:
        # Fallback to regular text if something goes wrong
        return transcription.text
@mcp.resource("elevenlabs://{filename}")
def get_elevenlabs_resource(filename: str) -> Resource:
    """
    Resource handler for ElevenLabs generated files.
    """
    candidate = Path(filename)
    base_dir = make_output_path(None, base_path)

    if candidate.is_absolute():
        file_path = candidate.resolve()
    else:
        base_dir_resolved = base_dir.resolve()
        resolved_file = (base_dir_resolved / candidate).resolve()
        try:
            resolved_file.relative_to(base_dir_resolved)
        except ValueError:
            make_error(
                f"Resource path ({resolved_file}) is outside of allowed directory {base_dir_resolved}"
            )
        file_path = resolved_file

    if not file_path.exists():
        raise FileNotFoundError(f"Resource file not found: {filename}")

    # Read the file and determine MIME type
    try:
        with open(file_path, "rb") as f:
            file_data = f.read()
    except IOError as e:
        raise FileNotFoundError(f"Failed to read resource file {filename}: {e}")

    file_extension = file_path.suffix.lstrip(".")
    mime_type = get_mime_type(file_extension)

    # For text files, return text content
    if mime_type.startswith("text/"):
        try:
            text_content = file_data.decode("utf-8")
            return Resource(
                uri=f"elevenlabs://{filename}", mimeType=mime_type, text=text_content
            )
        except UnicodeDecodeError:
            make_error(
                f"Failed to decode text resource {filename} as UTF-8; MIME type {mime_type} may be incorrect or file is corrupt"
            )

    # For binary files, return base64 encoded data
    base64_data = base64.b64encode(file_data).decode("utf-8")
    return Resource(
        uri=f"elevenlabs://{filename}", mimeType=mime_type, data=base64_data
    )


@mcp.tool(
    description=f"""Convert text to speech with a given voice. {get_output_mode_description(output_mode)}.
    
    Only one of voice_id or voice_name can be provided. If none are provided, the default voice will be used.

    ⚠️ COST WARNING: This tool makes an API call to ElevenLabs which may incur costs. Only use when explicitly requested by the user.

     Args:
        text (str): The text to convert to speech.
        voice_name (str, optional): The name of the voice to use.
        model_id (str, optional): The model ID to use for speech synthesis. Options include:
            - eleven_multilingual_v2: High quality multilingual model (29 languages)
            - eleven_flash_v2_5: Fastest model with ultra-low latency (32 languages)
            - eleven_turbo_v2_5: Balanced quality and speed (32 languages)
            - eleven_flash_v2: Fast English-only model
            - eleven_turbo_v2: Balanced English-only model
            - eleven_monolingual_v1: Legacy English model
            Defaults to eleven_multilingual_v2 or environment variable ELEVENLABS_MODEL_ID.
        stability (float, optional): Stability of the generated audio. Determines how stable the voice is and the randomness between each generation. Lower values introduce broader emotional range for the voice. Higher values can result in a monotonous voice with limited emotion. Range is 0 to 1.
        similarity_boost (float, optional): Similarity boost of the generated audio. Determines how closely the AI should adhere to the original voice when attempting to replicate it. Range is 0 to 1.
        style (float, optional): Style of the generated audio. Determines the style exaggeration of the voice. This setting attempts to amplify the style of the original speaker. It does consume additional computational resources and might increase latency if set to anything other than 0. Range is 0 to 1.
        use_speaker_boost (bool, optional): Use speaker boost of the generated audio. This setting boosts the similarity to the original speaker. Using this setting requires a slightly higher computational load, which in turn increases latency.
        speed (float, optional): Speed of the generated audio. Controls the speed of the generated speech. Values range from 0.7 to 1.2, with 1.0 being the default speed. Lower values create slower, more deliberate speech while higher values produce faster-paced speech. Extreme values can impact the quality of the generated speech. Range is 0.7 to 1.2.
        output_directory (str, optional): Directory where files should be saved (only used when saving files).
            Defaults to $HOME/Desktop if not provided.
        language: ISO 639-1 language code for the voice.
        output_format (str, optional): Output format of the generated audio. Formatted as codec_sample_rate_bitrate. So an mp3 with 22.05kHz sample rate at 32kbs is represented as mp3_22050_32. MP3 with 192kbps bitrate requires you to be subscribed to Creator tier or above. PCM with 44.1kHz sample rate requires you to be subscribed to Pro tier or above. Note that the μ-law format (sometimes written mu-law, often approximated as u-law) is commonly used for Twilio audio inputs.
            Defaults to "mp3_44100_128". Must be one of:
            mp3_22050_32
            mp3_44100_32
            mp3_44100_64
            mp3_44100_96
            mp3_44100_128
            mp3_44100_192
            pcm_8000
            pcm_16000
            pcm_22050
            pcm_24000
            pcm_44100
            ulaw_8000
            alaw_8000
            opus_48000_32
            opus_48000_64
            opus_48000_96
            opus_48000_128
            opus_48000_192

    Returns:
        Text content with file path or MCP resource with audio data, depending on output mode.
    """
)
def text_to_speech(
    text: str,
    voice_name: str | None = None,
    output_directory: str | None = None,
    voice_id: str | None = None,
    stability: float = 0.5,
    similarity_boost: float = 0.75,
    style: float = 0,
    use_speaker_boost: bool = True,
    speed: float = 1.0,
    language: str = "en",
    output_format: str = "mp3_44100_128",
    model_id: str | None = None,
) -> Union[TextContent, EmbeddedResource]:
    if text == "":
        make_error("Text is required.")

    if voice_id is not None and voice_name is not None:
        make_error("voice_id and voice_name cannot both be provided.")

    voice = None
    if voice_id is not None:
        voice = client.voices.get(voice_id=voice_id)
    elif voice_name is not None:
        voices = client.voices.search(search=voice_name)
        if len(voices.voices) == 0:
            make_error("No voices found with that name.")
        voice = next((v for v in voices.voices if v.name == voice_name), None)
        if voice is None:
            make_error(f"Voice with name: {voice_name} does not exist.")

    voice_id = voice.voice_id if voice else DEFAULT_VOICE_ID

    output_path = make_output_path(output_directory, base_path)
    output_file_name = make_output_file("tts", text, "mp3")

    if model_id is None:
        model_id = (
            "eleven_flash_v2_5"
            if language in ["hu", "no", "vi"]
            else "eleven_multilingual_v2"
        )

    audio_data = client.text_to_speech.convert(
        text=text,
        voice_id=voice_id,
        model_id=model_id,
        output_format=output_format,
        voice_settings={
            "stability": stability,
            "similarity_boost": similarity_boost,
            "style": style,
            "use_speaker_boost": use_speaker_boost,
            "speed": speed,
        },
    )
    audio_bytes = b"".join(audio_data)

    # Handle different output modes
    success_message = f"Success. File saved as: {{file_path}}. Voice used: {voice.name if voice else DEFAULT_VOICE_ID}"
    return handle_output_mode(
        audio_bytes, output_path, output_file_name, output_mode, success_message
    )


@mcp.tool(
    description=f"""Transcribe speech from an audio file. When save_transcript_to_file=True: {get_output_mode_description(output_mode)}. When return_transcript_to_client_directly=True, always returns text directly regardless of output mode.

    ⚠️ COST WARNING: This tool makes an API call to ElevenLabs which may incur costs. Only use when explicitly requested by the user.

    Args:
        file_path: Path to the audio file to transcribe
        language_code: ISO 639-3 language code for transcription. If not provided, the language will be detected automatically.
        diarize: Whether to diarize the audio file. If True, which speaker is currently speaking will be annotated in the transcription.
        save_transcript_to_file: Whether to save the transcript to a file.
        return_transcript_to_client_directly: Whether to return the transcript to the client directly.
        output_directory: Directory where files should be saved (only used when saving files).
            Defaults to $HOME/Desktop if not provided.

    Returns:
        TextContent containing the transcription or MCP resource with transcript data.
    """
)
def speech_to_text(
    input_file_path: str,
    language_code: str | None = None,
    diarize: bool = False,
    save_transcript_to_file: bool = True,
    return_transcript_to_client_directly: bool = False,
    output_directory: str | None = None,
) -> Union[TextContent, EmbeddedResource]:
    if not save_transcript_to_file and not return_transcript_to_client_directly:
        make_error("Must save transcript to file or return it to the client directly.")
    file_path = handle_input_file(input_file_path)
    if save_transcript_to_file:
        output_path = make_output_path(output_directory, base_path)
        output_file_name = make_output_file("stt", file_path.name, "txt")
    with file_path.open("rb") as f:
        audio_bytes = f.read()

    if language_code == "" or language_code is None:
        language_code = None

    transcription = client.speech_to_text.convert(
        model_id="scribe_v1",
        file=audio_bytes,
        language_code=language_code,
        enable_logging=True,
        diarize=diarize,
        tag_audio_events=True,
    )

    # Format transcript with speaker identification if diarization was enabled
    if diarize:
        formatted_transcript = format_diarized_transcript(transcription)
    else:
        formatted_transcript = transcription.text

    if return_transcript_to_client_directly:
        return TextContent(type="text", text=formatted_transcript)

    if save_transcript_to_file:
        transcript_bytes = formatted_transcript.encode("utf-8")

        # Handle different output modes
        success_message = f"Transcription saved to {file_path}"
        return handle_output_mode(
            transcript_bytes,
            output_path,
            output_file_name,
            output_mode,
            success_message,
        )

    # This should not be reached due to validation at the start of the function
    return TextContent(type="text", text="No output mode specified")


@mcp.tool(
    description=f"""Convert text description of a sound effect to sound effect with a given duration. {get_output_mode_description(output_mode)}.
    
    Duration must be between 0.5 and 5 seconds.

    ⚠️ COST WARNING: This tool makes an API call to ElevenLabs which may incur costs. Only use when explicitly requested by the user.

    Args:
        text: Text description of the sound effect
        duration_seconds: Duration of the sound effect in seconds
        output_directory: Directory where files should be saved (only used when saving files).
            Defaults to $HOME/Desktop if not provided.
        loop: Whether to loop the sound effect. Defaults to False.
        output_format (str, optional): Output format of the generated audio. Formatted as codec_sample_rate_bitrate. So an mp3 with 22.05kHz sample rate at 32kbs is represented as mp3_22050_32. MP3 with 192kbps bitrate requires you to be subscribed to Creator tier or above. PCM with 44.1kHz sample rate requires you to be subscribed to Pro tier or above. Note that the μ-law format (sometimes written mu-law, often approximated as u-law) is commonly used for Twilio audio inputs.
            Defaults to "mp3_44100_128". Must be one of:
            mp3_22050_32
            mp3_44100_32
            mp3_44100_64
            mp3_44100_96
            mp3_44100_128
            mp3_44100_192
            pcm_8000
            pcm_16000
            pcm_22050
            pcm_24000
            pcm_44100
            ulaw_8000
            alaw_8000
            opus_48000_32
            opus_48000_64
            opus_48000_96
            opus_48000_128
            opus_48000_192
    """
)
def text_to_sound_effects(
    text: str,
    duration_seconds: float = 2.0,
    output_directory: str | None = None,
    output_format: str = "mp3_44100_128",
    loop: bool = False,
) -> Union[TextContent, EmbeddedResource]:
    if duration_seconds < 0.5 or duration_seconds > 5:
        make_error("Duration must be between 0.5 and 5 seconds")
    output_path = make_output_path(output_directory, base_path)
    output_file_name = make_output_file("sfx", text, "mp3")

    audio_data = client.text_to_sound_effects.convert(
        text=text,
        output_format=output_format,
        duration_seconds=duration_seconds,
        loop=loop,
    )
    audio_bytes = b"".join(audio_data)

    # Handle different output modes
    return handle_output_mode(audio_bytes, output_path, output_file_name, output_mode)


@mcp.tool(
    description="""
    Search for existing voices, a voice that has already been added to the user's ElevenLabs voice library.
    Searches in name, description, labels and category.

    Args:
        search: Search term to filter voices by. Searches in name, description, labels and category.
        sort: Which field to sort by. `created_at_unix` might not be available for older voices.
        sort_direction: Sort order, either ascending or descending.

    Returns:
        List of voices that match the search criteria.
    """
)
def search_voices(
    search: str | None = None,
    sort: Literal["created_at_unix", "name"] = "name",
    sort_direction: Literal["asc", "desc"] = "desc",
) -> list[McpVoice]:
    response = client.voices.search(
        search=search, sort=sort, sort_direction=sort_direction
    )
    return [
        McpVoice(id=voice.voice_id, name=voice.name, category=voice.category)
        for voice in response.voices
    ]


@mcp.tool(description="List all available models")
def list_models() -> list[McpModel]:
    response = client.models.list()
    return [
        McpModel(
            id=model.model_id,
            name=model.name,
            languages=[
                McpLanguage(language_id=lang.language_id, name=lang.name)
                for lang in model.languages
            ],
        )
        for model in response
    ]


@mcp.tool(description="Get details of a specific voice")
def get_voice(voice_id: str) -> McpVoice:
    """Get details of a specific voice."""
    response = client.voices.get(voice_id=voice_id)
    return McpVoice(
        id=response.voice_id,
        name=response.name,
        category=response.category,
        fine_tuning_status=response.fine_tuning.state,
    )


@mcp.tool(
    description="""Create an instant voice clone of a voice using provided audio files.

    ⚠️ COST WARNING: This tool makes an API call to ElevenLabs which may incur costs. Only use when explicitly requested by the user.
    """
)
def voice_clone(
    name: str, files: list[str], description: str | None = None
) -> TextContent:
    input_files = [str(handle_input_file(file).absolute()) for file in files]
    voice = client.voices.ivc.create(
        name=name, description=description, files=input_files
    )

    return TextContent(
        type="text",
        text=f"""Voice cloned successfully: Name: {voice.name}
        ID: {voice.voice_id}
        Category: {voice.category}
        Description: {voice.description or "N/A"}""",
    )


@mcp.tool(
    description=f"""Isolate audio from a file. {get_output_mode_description(output_mode)}.

    ⚠️ COST WARNING: This tool makes an API call to ElevenLabs which may incur costs. Only use when explicitly requested by the user.
    """
)
def isolate_audio(
    input_file_path: str, output_directory: str | None = None
) -> Union[TextContent, EmbeddedResource]:
    file_path = handle_input_file(input_file_path)
    output_path = make_output_path(output_directory, base_path)
    output_file_name = make_output_file("iso", file_path.name, "mp3")
    with file_path.open("rb") as f:
        audio_bytes = f.read()
    audio_data = client.audio_isolation.convert(
        audio=audio_bytes,
    )
    audio_bytes = b"".join(audio_data)

    # Handle different output modes
    return handle_output_mode(audio_bytes, output_path, output_file_name, output_mode)


@mcp.tool(
    description="Check the current subscription status. Could be used to measure the usage of the API."
)
def check_subscription() -> TextContent:
    subscription = client.user.subscription.get()
    return TextContent(type="text", text=f"{subscription.model_dump_json(indent=2)}")


@mcp.tool(
    description="""Create a conversational AI agent with custom configuration.

    ⚠️ COST WARNING: This tool makes an API call to ElevenLabs which may incur costs. Only use when explicitly requested by the user.

    Args:
        name: Name of the agent
        first_message: First message the agent will say i.e. "Hi, how can I help you today?"
        system_prompt: System prompt for the agent
        voice_id: ID of the voice to use for the agent
        language: ISO 639-1 language code for the agent
        llm: LLM to use for the agent
        temperature: Temperature for the agent. The lower the temperature, the more deterministic the agent's responses will be. Range is 0 to 1.
        max_tokens: Maximum number of tokens to generate.
        asr_quality: Quality of the ASR. `high` or `low`.
        model_id: ID of the ElevenLabs model to use for the agent.
        optimize_streaming_latency: Optimize streaming latency. Range is 0 to 4.
        stability: Stability for the agent. Range is 0 to 1.
        similarity_boost: Similarity boost for the agent. Range is 0 to 1.
        turn_timeout: Timeout for the agent to respond in seconds. Defaults to 7 seconds.
        max_duration_seconds: Maximum duration of a conversation in seconds. Defaults to 600 seconds (10 minutes).
        record_voice: Whether to record the agent's voice.
        retention_days: Number of days to retain the agent's data.
    """
)
def create_agent(
    name: str,
    first_message: str,
    system_prompt: str,
    voice_id: str | None = DEFAULT_VOICE_ID,
    language: str = "en",
    llm: str = "gemini-2.0-flash-001",
    temperature: float = 0.5,
    max_tokens: int | None = None,
    asr_quality: str = "high",
    model_id: str = "eleven_turbo_v2",
    optimize_streaming_latency: int = 3,
    stability: float = 0.5,
    similarity_boost: float = 0.8,
    turn_timeout: int = 7,
    max_duration_seconds: int = 300,
    record_voice: bool = True,
    retention_days: int = 730,
) -> TextContent:
    conversation_config = create_conversation_config(
        language=language,
        system_prompt=system_prompt,
        llm=llm,
        first_message=first_message,
        temperature=temperature,
        max_tokens=max_tokens,
        asr_quality=asr_quality,
        voice_id=voice_id,
        model_id=model_id,
        optimize_streaming_latency=optimize_streaming_latency,
        stability=stability,
        similarity_boost=similarity_boost,
        turn_timeout=turn_timeout,
        max_duration_seconds=max_duration_seconds,
    )

    platform_settings = create_platform_settings(
        record_voice=record_voice,
        retention_days=retention_days,
    )

    response = client.conversational_ai.agents.create(
        name=name,
        conversation_config=conversation_config,
        platform_settings=platform_settings,
    )

    return TextContent(
        type="text",
        text=f"""Agent created successfully: Name: {name}, Agent ID: {response.agent_id}, System Prompt: {system_prompt}, Voice ID: {voice_id or "Default"}, Language: {language}, LLM: {llm}, You can use this agent ID for future interactions with the agent.""",
    )


@mcp.tool(
    description="""Add a knowledge base to ElevenLabs workspace. Allowed types are epub, pdf, docx, txt, html.

    ⚠️ COST WARNING: This tool makes an API call to ElevenLabs which may incur costs. Only use when explicitly requested by the user.

    Args:
        agent_id: ID of the agent to add the knowledge base to.
        knowledge_base_name: Name of the knowledge base.
        url: URL of the knowledge base.
        input_file_path: Path to the file to add to the knowledge base.
        text: Text to add to the knowledge base.
    """
)
def add_knowledge_base_to_agent(
    agent_id: str,
    knowledge_base_name: str,
    url: str | None = None,
    input_file_path: str | None = None,
    text: str | None = None,
) -> TextContent:
    provided_params = [
        param for param in [url, input_file_path, text] if param is not None
    ]
    if len(provided_params) == 0:
        make_error("Must provide either a URL, a file, or text")
    if len(provided_params) > 1:
        make_error("Must provide exactly one of: URL, file, or text")

    if url is not None:
        response = client.conversational_ai.knowledge_base.documents.create_from_url(
            name=knowledge_base_name,
            url=url,
        )
    else:
        if text is not None:
            text_bytes = text.encode("utf-8")
            text_io = BytesIO(text_bytes)
            text_io.name = "text.txt"
            text_io.content_type = "text/plain"
            file = text_io
        elif input_file_path is not None:
            path = handle_input_file(
                file_path=input_file_path, audio_content_check=False
            )
            file = open(path, "rb")

        response = client.conversational_ai.knowledge_base.documents.create_from_file(
            name=knowledge_base_name,
            file=file,
        )

    agent = client.conversational_ai.agents.get(agent_id=agent_id)

    agent_config = agent.conversation_config.agent
    knowledge_base_list = (
        agent_config.get("prompt", {}).get("knowledge_base", []) if agent_config else []
    )
    knowledge_base_list.append(
        KnowledgeBaseLocator(
            type="file" if file else "url",
            name=knowledge_base_name,
            id=response.id,
        )
    )

    if agent_config and "prompt" not in agent_config:
        agent_config["prompt"] = {}
    if agent_config:
        agent_config["prompt"]["knowledge_base"] = knowledge_base_list

    client.conversational_ai.agents.update(
        agent_id=agent_id, conversation_config=agent.conversation_config
    )
    return TextContent(
        type="text",
        text=f"""Knowledge base created with ID: {response.id} and added to agent {agent_id} successfully.""",
    )


@mcp.tool(description="List all available conversational AI agents")
def list_agents() -> TextContent:
    """List all available conversational AI agents.

    Returns:
        TextContent with a formatted list of available agents
    """
    response = client.conversational_ai.agents.list()

    if not response.agents:
        return TextContent(type="text", text="No agents found.")

    agent_list = ",".join(
        f"{agent.name} (ID: {agent.agent_id})" for agent in response.agents
    )

    return TextContent(type="text", text=f"Available agents: {agent_list}")


@mcp.tool(description="Get details about a specific conversational AI agent")
def get_agent(agent_id: str) -> TextContent:
    """Get details about a specific conversational AI agent.

    Args:
        agent_id: The ID of the agent to retrieve

    Returns:
        TextContent with detailed information about the agent
    """
    response = client.conversational_ai.agents.get(agent_id=agent_id)

    voice_info = "None"
    if response.conversation_config.tts:
        voice_info = f"Voice ID: {response.conversation_config.tts.voice_id}"

    return TextContent(
        type="text",
        text=f"Agent Details: Name: {response.name}, Agent ID: {response.agent_id}, Voice Configuration: {voice_info}, Created At: {datetime.fromtimestamp(response.metadata.created_at_unix_secs).strftime('%Y-%m-%d %H:%M:%S')}",
    )


@mcp.tool(
    description="""Gets conversation with transcript. Returns: conversation details and full transcript. Use when: analyzing completed agent conversations.

    Args:
        conversation_id: The unique identifier of the conversation to retrieve, you can get the ids from the list_conversations tool.
    """
)
def get_conversation(
    conversation_id: str,
) -> TextContent:
    """Get conversation details with transcript"""
    try:
        response = client.conversational_ai.conversations.get(conversation_id)

        # Parse transcript using utility function
        transcript, _ = parse_conversation_transcript(response.transcript)

        response_text = f"""Conversation Details:
ID: {response.conversation_id}
Status: {response.status}
Agent ID: {response.agent_id}
Message Count: {len(response.transcript)}

Transcript:
{transcript}"""

        if response.metadata:
            metadata = response.metadata
            duration = getattr(
                metadata,
                "call_duration_secs",
                getattr(metadata, "duration_seconds", "N/A"),
            )
            started_at = getattr(
                metadata, "start_time_unix_secs", getattr(metadata, "started_at", "N/A")
            )
            response_text += (
                f"\n\nMetadata:\nDuration: {duration} seconds\nStarted: {started_at}"
            )

        if response.analysis:
            analysis_summary = getattr(
                response.analysis, "summary", "Analysis available but no summary"
            )
            response_text += f"\n\nAnalysis:\n{analysis_summary}"

        return TextContent(type="text", text=response_text)

    except Exception as e:
        make_error(f"Failed to fetch conversation: {str(e)}")
        # satisfies type checker
        return TextContent(type="text", text="")


@mcp.tool(
    description="""Lists agent conversations. Returns: conversation list with metadata. Use when: asked about conversation history.

    Args:
        agent_id (str, optional): Filter conversations by specific agent ID
        cursor (str, optional): Pagination cursor for retrieving next page of results
        call_start_before_unix (int, optional): Filter conversations that started before this Unix timestamp
        call_start_after_unix (int, optional): Filter conversations that started after this Unix timestamp
        page_size (int, optional): Number of conversations to return per page (1-100, defaults to 30)
        max_length (int, optional): Maximum character length of the response text (defaults to 10000)
    """
)
def list_conversations(
    agent_id: str | None = None,
    cursor: str | None = None,
    call_start_before_unix: int | None = None,
    call_start_after_unix: int | None = None,
    page_size: int = 30,
    max_length: int = 10000,
) -> TextContent:
    """List conversations with filtering options."""
    page_size = min(page_size, 100)

    try:
        response = client.conversational_ai.conversations.list(
            cursor=cursor,
            agent_id=agent_id,
            call_start_before_unix=call_start_before_unix,
            call_start_after_unix=call_start_after_unix,
            page_size=page_size,
        )

        if not response.conversations:
            return TextContent(type="text", text="No conversations found.")

        conv_list = []
        for conv in response.conversations:
            start_time = datetime.fromtimestamp(conv.start_time_unix_secs).strftime(
                "%Y-%m-%d %H:%M:%S"
            )

            conv_info = f"""Conversation ID: {conv.conversation_id}
Status: {conv.status}
Agent: {conv.agent_name or 'N/A'} (ID: {conv.agent_id})
Started: {start_time}
Duration: {conv.call_duration_secs} seconds
Messages: {conv.message_count}
Call Successful: {conv.call_successful}"""

            conv_list.append(conv_info)

        formatted_list = "\n\n".join(conv_list)

        pagination_info = f"Showing {len(response.conversations)} conversations"
        if response.has_more:
            pagination_info += f" (more available, next cursor: {response.next_cursor})"

        full_text = f"{pagination_info}\n\n{formatted_list}"

        # Use utility to handle large text content
        result_text = handle_large_text(full_text, max_length, "conversation list")

        # If content was saved to file, prepend pagination info
        if result_text != full_text:
            result_text = f"{pagination_info}\n\n{result_text}"

        return TextContent(type="text", text=result_text)

    except Exception as e:
        make_error(f"Failed to list conversations: {str(e)}")
        # This line is unreachable but satisfies type checker
        return TextContent(type="text", text="")


@mcp.tool(
    description=f"""Transform audio from one voice to another using provided audio files. {get_output_mode_description(output_mode)}.

    ⚠️ COST WARNING: This tool makes an API call to ElevenLabs which may incur costs. Only use when explicitly requested by the user.
    """
)
def speech_to_speech(
    input_file_path: str,
    voice_name: str = "Adam",
    output_directory: str | None = None,
) -> Union[TextContent, EmbeddedResource]:
    voices = client.voices.search(search=voice_name)

    if len(voices.voices) == 0:
        make_error("No voice found with that name.")

    voice = next((v for v in voices.voices if v.name == voice_name), None)

    if voice is None:
        make_error(f"Voice with name: {voice_name} does not exist.")

    assert voice is not None  # Type assertion for type checker
    file_path = handle_input_file(input_file_path)
    output_path = make_output_path(output_directory, base_path)
    output_file_name = make_output_file("sts", file_path.name, "mp3")

    with file_path.open("rb") as f:
        audio_bytes = f.read()

    audio_data = client.speech_to_speech.convert(
        model_id="eleven_multilingual_sts_v2",
        voice_id=voice.voice_id,
        audio=audio_bytes,
    )

    audio_bytes = b"".join(audio_data)

    # Handle different output modes
    return handle_output_mode(audio_bytes, output_path, output_file_name, output_mode)


@mcp.tool(
    description=f"""Create voice previews from a text prompt. Creates three previews with slight variations. {get_output_mode_description(output_mode)}.
    
    If no text is provided, the tool will auto-generate text.

    Voice preview files are saved as: voice_design_(generated_voice_id)_(timestamp).mp3

    Example file name: voice_design_Ya2J5uIa5Pq14DNPsbC1_20250403_164949.mp3

    ⚠️ COST WARNING: This tool makes an API call to ElevenLabs which may incur costs. Only use when explicitly requested by the user.
    """
)
def text_to_voice(
    voice_description: str,
    text: str | None = None,
    output_directory: str | None = None,
) -> list[EmbeddedResource] | TextContent:
    if voice_description == "":
        make_error("Voice description is required.")

    previews = client.text_to_voice.create_previews(
        voice_description=voice_description,
        text=text,
        auto_generate_text=True if text is None else False,
    )

    output_path = make_output_path(output_directory, base_path)

    generated_voice_ids = []
    results = []

    for preview in previews.previews:
        output_file_name = make_output_file(
            "voice_design", preview.generated_voice_id, "mp3", full_id=True
        )
        generated_voice_ids.append(preview.generated_voice_id)
        audio_bytes = base64.b64decode(preview.audio_base_64)

        # Handle different output modes
        result = handle_output_mode(
            audio_bytes, output_path, output_file_name, output_mode
        )
        results.append(result)

    # Use centralized multiple files output handling
    additional_info = f"Generated voice IDs are: {', '.join(generated_voice_ids)}"
    return handle_multiple_files_output_mode(results, output_mode, additional_info)


@mcp.tool(
    description="""Add a generated voice to the voice library. Uses the voice ID from the `text_to_voice` tool.

    ⚠️ COST WARNING: This tool makes an API call to ElevenLabs which may incur costs. Only use when explicitly requested by the user.
    """
)
def create_voice_from_preview(
    generated_voice_id: str,
    voice_name: str,
    voice_description: str,
) -> TextContent:
    voice = client.text_to_voice.create_voice_from_preview(
        voice_name=voice_name,
        voice_description=voice_description,
        generated_voice_id=generated_voice_id,
    )

    return TextContent(
        type="text",
        text=f"Success. Voice created: {voice.name} with ID:{voice.voice_id}",
    )


def _get_phone_number_by_id(phone_number_id: str):
    """Helper function to get phone number details by ID."""
    phone_numbers = client.conversational_ai.phone_numbers.list()
    for phone in phone_numbers:
        if phone.phone_number_id == phone_number_id:
            return phone
    make_error(f"Phone number with ID {phone_number_id} not found.")


@mcp.tool(
    description="""Make an outbound call using an ElevenLabs agent. Automatically detects provider type (Twilio or SIP trunk) and uses the appropriate API.

    ⚠️ COST WARNING: This tool makes an API call to ElevenLabs which may incur costs. Only use when explicitly requested by the user.

    Args:
        agent_id: The ID of the agent that will handle the call
        agent_phone_number_id: The ID of the phone number to use for the call
        to_number: The phone number to call (E.164 format: +1xxxxxxxxxx)

    Returns:
        TextContent containing information about the call
    """
)
def make_outbound_call(
    agent_id: str,
    agent_phone_number_id: str,
    to_number: str,
) -> TextContent:
    # Get phone number details to determine provider type
    phone_number = _get_phone_number_by_id(agent_phone_number_id)

    if phone_number.provider.lower() == "twilio":
        response = client.conversational_ai.twilio.outbound_call(
            agent_id=agent_id,
            agent_phone_number_id=agent_phone_number_id,
            to_number=to_number,
        )
        provider_info = "Twilio"
    elif phone_number.provider.lower() == "sip_trunk":
        response = client.conversational_ai.sip_trunk.outbound_call(
            agent_id=agent_id,
            agent_phone_number_id=agent_phone_number_id,
            to_number=to_number,
        )
        provider_info = "SIP trunk"
    else:
        make_error(f"Unsupported provider type: {phone_number.provider}")

    return TextContent(
        type="text", text=f"Outbound call initiated via {provider_info}: {response}."
    )


@mcp.tool(
    description="""Search for a voice across the entire ElevenLabs voice library.

    Args:
        page: Page number to return (0-indexed)
        page_size: Number of voices to return per page (1-100)
        search: Search term to filter voices by

    Returns:
        TextContent containing information about the shared voices
    """
)
def search_voice_library(
    page: int = 0,
    page_size: int = 10,
    search: str | None = None,
) -> TextContent:
    response = client.voices.get_shared(
        page=page,
        page_size=page_size,
        search=search,
    )

    if not response.voices:
        return TextContent(
            type="text", text="No shared voices found with the specified criteria."
        )

    voice_list = []
    for voice in response.voices:
        language_info = "N/A"
        if hasattr(voice, "verified_languages") and voice.verified_languages:
            languages = []
            for lang in voice.verified_languages:
                accent_info = (
                    f" ({lang.accent})"
                    if hasattr(lang, "accent") and lang.accent
                    else ""
                )
                languages.append(f"{lang.language}{accent_info}")
            language_info = ", ".join(languages)

        details = [
            f"Name: {voice.name}",
            f"ID: {voice.voice_id}",
            f"Category: {getattr(voice, 'category', 'N/A')}",
        ]
        # TODO: Make cleaner
        if hasattr(voice, "gender") and voice.gender:
            details.append(f"Gender: {voice.gender}")
        if hasattr(voice, "age") and voice.age:
            details.append(f"Age: {voice.age}")
        if hasattr(voice, "accent") and voice.accent:
            details.append(f"Accent: {voice.accent}")
        if hasattr(voice, "description") and voice.description:
            details.append(f"Description: {voice.description}")
        if hasattr(voice, "use_case") and voice.use_case:
            details.append(f"Use Case: {voice.use_case}")

        details.append(f"Languages: {language_info}")

        if hasattr(voice, "preview_url") and voice.preview_url:
            details.append(f"Preview URL: {voice.preview_url}")

        voice_info = "\n".join(details)
        voice_list.append(voice_info)

    formatted_info = "\n\n".join(voice_list)
    return TextContent(type="text", text=f"Shared Voices:\n\n{formatted_info}")


@mcp.tool(description="List all phone numbers associated with the ElevenLabs account")
def list_phone_numbers() -> TextContent:
    """List all phone numbers associated with the ElevenLabs account.

    Returns:
        TextContent containing formatted information about the phone numbers
    """
    response = client.conversational_ai.phone_numbers.list()

    if not response:
        return TextContent(type="text", text="No phone numbers found.")

    phone_info = []
    for phone in response:
        assigned_agent = "None"
        if phone.assigned_agent:
            assigned_agent = f"{phone.assigned_agent.agent_name} (ID: {phone.assigned_agent.agent_id})"

        phone_info.append(
            f"Phone Number: {phone.phone_number}\n"
            f"ID: {phone.phone_number_id}\n"
            f"Provider: {phone.provider}\n"
            f"Label: {phone.label}\n"
            f"Assigned Agent: {assigned_agent}"
        )

    formatted_info = "\n\n".join(phone_info)
    return TextContent(type="text", text=f"Phone Numbers:\n\n{formatted_info}")


@mcp.tool(description="Play an audio file. Supports WAV and MP3 formats.")
def play_audio(input_file_path: str) -> TextContent:
    file_path = handle_input_file(input_file_path)
    play(open(file_path, "rb").read(), use_ffmpeg=False)
    return TextContent(type="text", text=f"Successfully played audio file: {file_path}")


@mcp.tool(
    description="""Convert a prompt to music and save the output audio file to a given directory.
    Directory is optional, if not provided, the output file will be saved to $HOME/Desktop.

    Args:
        prompt: Prompt to convert to music. Must provide either prompt or composition_plan.
        output_directory: Directory to save the output audio file
        composition_plan: Composition plan to use for the music. Must provide either prompt or composition_plan.
        music_length_ms: Length of the generated music in milliseconds. Cannot be used if composition_plan is provided.

    ⚠️ COST WARNING: This tool makes an API call to ElevenLabs which may incur costs. Only use when explicitly requested by the user."""
)
def compose_music(
    prompt: str | None = None,
    output_directory: str | None = None,
    composition_plan: MusicPrompt | None = None,
    music_length_ms: int | None = None,
) -> Union[TextContent, EmbeddedResource]:
    if prompt is None and composition_plan is None:
        make_error(
            f"Either prompt or composition_plan must be provided. Prompt: {prompt}"
        )

    if prompt is not None and composition_plan is not None:
        make_error("Only one of prompt or composition_plan must be provided")

    if music_length_ms is not None and composition_plan is not None:
        make_error("music_length_ms cannot be used if composition_plan is provided")

    output_path = make_output_path(output_directory, base_path)
    output_file_name = make_output_file("music", "", "mp3")

    audio_data = client.music.compose(
        prompt=prompt,
        music_length_ms=music_length_ms,
        composition_plan=composition_plan,
    )

    audio_bytes = b"".join(audio_data)

    # Handle different output modes
    return handle_output_mode(audio_bytes, output_path, output_file_name, output_mode)


@mcp.tool(
    description="""Create a composition plan for music generation. Usage of this endpoint does not cost any credits but is subject to rate limiting depending on your tier. Composition plans can be used when generating music with the compose_music tool.

    Args:
        prompt: Prompt to create a composition plan for
        music_length_ms: The length of the composition plan to generate in milliseconds. Must be between 10000ms and 300000ms. Optional - if not provided, the model will choose a length based on the prompt.
        source_composition_plan: An optional composition plan to use as a source for the new composition plan
    """
)
def create_composition_plan(
    prompt: str,
    music_length_ms: int | None = None,
    source_composition_plan: MusicPrompt | None = None,
) -> MusicPrompt:
    composition_plan = client.music.composition_plan.create(
        prompt=prompt,
        music_length_ms=music_length_ms,
        source_composition_plan=source_composition_plan,
    )

    return composition_plan


def main():
    print("Starting MCP server")
    """Run the MCP server"""
    mcp.run()


if __name__ == "__main__":
    main()
